/*
 * 文件名：KafkaConsumer.java
 * 版权：Copyright by Layne
 * 描述：
 * 修改人：Layne
 * 修改时间：2019年5月17日
 * 跟踪单号：
 * 修改单号：
 * 修改内容：
 */

package com.bicycle.access.consumer;

import com.bicycle.access.process.DataToMySqlService;
import com.bicycle.access.thread.StatictisThread;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class KafkaConsumer {
    @Autowired
    private DataToMySqlService dataToMySqlService;
    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(8,8,20, TimeUnit.SECONDS,new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());//启用多线程

    //"JR_MB", "JR_QJ"
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"0"})})
    public void listen(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"1"})})
    public void listen1(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"2"})})
    public void listen2(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"3"})})
    public void listen3(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"4"})})
    public void listen4(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"5"})})
    public void listen5(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"6"})})
    public void listen6(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }
    @KafkaListener(topicPartitions = { @TopicPartition(topic = "JR_HL", partitions = {"7"})})
    public void listen7(ConsumerRecord<Object, Object> consumerRecord) {
        executor.execute(new StatictisThread(consumerRecord,dataToMySqlService));
    }


    @KafkaListener(topics = {"TS_MB", "TS_MT", "TS_QJ", "TS_HL"})
    public void areaListen(ConsumerRecord<Object, Object> consumerRecord) {
        try {
            log.info(">>>>>>Consumer:topic:" + consumerRecord.topic() + ",offset:" + consumerRecord.offset() + ",key:" + consumerRecord.key() + ",value:" + consumerRecord.value());
        } catch (Exception e) {
            log.error(">>>>>>>>>>>>>>>>>>消费错误error:", e);
            log.error(">>>>>>消费错误Consumer:topic:" + consumerRecord.topic() + ",offset:" + consumerRecord.offset() + ",key:" + consumerRecord.key() + ",value:" + consumerRecord.value());
        }
    }

}
